/*
 * Copyright Debezium Authors.
 *
 * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
 */
package io.debezium.connector.binlog;

import static io.debezium.junit.EqualityCheck.LESS_THAN;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.debezium.config.Configuration;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.data.Envelope;
import io.debezium.embedded.DebeziumEngineTestUtils.CompletionResult;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.RelationalDatabaseConnectorConfig;

@SkipWhenDatabaseVersion(check = LESS_THAN, major = 8, minor = 0, patch = 20, reason = "MySQL 8.0.20 started supporting binlog compression")
public abstract class BinlogTransactionPayloadIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {

    private static final UUID PRODUCT_CODE = UUID.randomUUID();
    private static final String PRODUCT_NAME = "robot";
    private static final float PRODUCT_WEIGHT = 1.304f;

    private static final String PRODUCT_INSERT_STMT_1 = "INSERT INTO products (name, description, weight, code) VALUES ('" + PRODUCT_NAME + "', 'Toy robot', " +
            PRODUCT_WEIGHT + ", uuid_to_bin('" + PRODUCT_CODE + "'));";
    private static final String PRODUCT_INSERT_STMT_1_NO_UUID = "INSERT INTO products (name, description, weight) VALUES ('" + PRODUCT_NAME + "', 'Toy robot', " +
            PRODUCT_WEIGHT + ");";
    private static final String CUSTOMER_INSERT_STMT_1 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Nitin', 'Agarwal', 'test1@abc.com' ); ";
    private static final String CUSTOMER_INSERT_STMT_2 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Rajesh', 'Agarwal', 'test2@abc.com' ); ";
    private static final String ORDER_INSERT_STMT_1 = "INSERT INTO orders (order_date, purchaser, quantity, product_id) VALUES ('2016-01-16', 1001, 1, 1); ";

    private static final String CUSTOMER_UPDATE_STMT_1 = "UPDATE customers set first_name = 'Nitin1' where id = 1001; ";
    private static final String CUSTOMER_DELETE_STMT_1 = "DELETE from customers where id = 1001; ";

    private static final String ORDER_UPDATE_STMT_1 = "UPDATE orders set order_date = '2017-01-16' where order_number = 10001; ";
    private static final String ORDER_DELETE_STMT_1 = "DELETE from orders where order_number = 10001; ";

    private static final Path SCHEMA_HISTORY_PATH = Files.createTestingPath("file-schema-history-tp.txt").toAbsolutePath();

    private static final String SERVER_NAME = "transactionpayload_it";
    private final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase(SERVER_NAME, "transactionpayload_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);

    private Configuration config;

    @BeforeEach
    void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException {
        stopConnector();
        DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Files.delete(SCHEMA_HISTORY_PATH);
    }

    @AfterEach
    void afterEach() throws SQLException {
        try {
            stopConnector();
            // MariaDB's binlog compression is set globally, so we need to toggle this off after the test.
            try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName())) {
                db.setBinlogCompressionOff();
            }
        }
        finally {
            Files.delete(SCHEMA_HISTORY_PATH);
        }
    }

    @Test
    void shouldCaptureMultipleWriteEvents() throws Exception {
        config = DATABASE.defaultConfig()
                .with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
                .build();

        start(getConnectorClass(), config);

        Debug.enable();
        assertConnectorIsRunning();

        int numCreateDatabase = 1;
        int numCreateTables = 3;

        SourceRecords records = consumeRecordsByTopic(numCreateDatabase + numCreateTables);
        assertThat(records).isNotNull();
        records.forEach(this::validate);

        try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName())) {
            try (JdbcConnection connection = db.connect()) {
                // We exclude the code column because the uuid_to_bin function isn't something that is
                // in MariaDB as UUID is a basic database type now.
                String sql = db.isMariaDb() ? PRODUCT_INSERT_STMT_1_NO_UUID : PRODUCT_INSERT_STMT_1;
                db.setBinlogCompressionOn();
                connection.execute(CUSTOMER_INSERT_STMT_1, CUSTOMER_INSERT_STMT_2, sql,
                        ORDER_INSERT_STMT_1, CUSTOMER_UPDATE_STMT_1, ORDER_UPDATE_STMT_1, ORDER_DELETE_STMT_1,
                        CUSTOMER_DELETE_STMT_1);
            }
        }
        SourceRecords dmlRecords = consumeRecordsByTopic(10);
        List<SourceRecord> customerDmls = dmlRecords.recordsForTopic(DATABASE.topicForTable("customers"));
        List<SourceRecord> productDmls = dmlRecords.recordsForTopic(DATABASE.topicForTable("products"));
        List<SourceRecord> orderDmls = dmlRecords.recordsForTopic(DATABASE.topicForTable("orders"));

        assertThat(customerDmls).hasSize(5);
        assertThat(productDmls).hasSize(1);
        Struct product = ((Struct) productDmls.get(0).value()).getStruct(Envelope.FieldName.AFTER);
        assertThat(product.get("id")).isInstanceOf(Integer.class);
        assertThat(product.get("name")).isEqualTo(PRODUCT_NAME);
        assertThat(product.get("weight")).isEqualTo(PRODUCT_WEIGHT);
        if (!isMariaDb()) {
            assertThat(((ByteBuffer) product.get("code")).array()).isEqualTo(uuidToByteArray(PRODUCT_CODE));
        }
        assertThat(orderDmls).hasSize(4);
    }

    private byte[] uuidToByteArray(UUID uuid) {
        ByteBuffer buffer = ByteBuffer.wrap(new byte[16]);
        buffer.putLong(uuid.getMostSignificantBits());
        buffer.putLong(uuid.getLeastSignificantBits());
        return buffer.array();
    }

    @Test
    void shouldCorrectlySkipEventsInCompressedTransaction() throws Exception {
        config = DATABASE.defaultConfig()
                .with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
                // Since we rely on event counts to determine where to stop and restart,
                // we need to ensure each event is processed individually
                .with(BinlogConnectorConfig.MAX_BATCH_SIZE, "1")
                // Disable schema changes to reduce number of events to process and easier debugging
                .with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
                .build();

        // Start the connector with a completion callback that stops it partway through processing events
        CompletionResult completion = new CompletionResult();
        start(getConnectorClass(), config, completion, (record) -> {
            // Stop before processing the 3rd customer insert (id 1003)
            if (record.topic().contains("customers")) {
                Struct key = (Struct) record.key();
                Number id = (Number) key.get("id");
                if (id != null && id.intValue() == 1003) {
                    return true;
                }
            }
            return false;
        });

        assertConnectorIsRunning();

        // Execute multiple inserts in a single compressed transaction
        try (BinlogTestConnection db = getTestDatabaseConnection(DATABASE.getDatabaseName())) {
            try (JdbcConnection connection = db.connect()) {
                db.setBinlogCompressionOn();
                // Insert 5 customer records in a single transaction so they are compressed together
                final Connection jdbc = connection.connection();
                connection.setAutoCommit(false);
                final Statement statement = jdbc.createStatement();
                statement.executeUpdate(CUSTOMER_INSERT_STMT_1); // id 1001
                statement.executeUpdate(CUSTOMER_INSERT_STMT_2); // id 1002
                statement.executeUpdate("INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', 'test3@abc.com')"); // id 1003
                statement.executeUpdate("INSERT INTO customers (first_name, last_name, email) VALUES ('Jane', 'Smith', 'test4@abc.com')"); // id 1004
                statement.executeUpdate("INSERT INTO customers (first_name, last_name, email) VALUES ('Bob', 'Johnson', 'test5@abc.com')"); // id 1005
                jdbc.commit();
                connection.setAutoCommit(true);
            }
        }

        // Should consume only 2 inserts before stopping (1001 and 1002)
        SourceRecords records = consumeRecordsByTopic(2); // 2 customer inserts from the compressed transaction
        List<SourceRecord> customerInserts = records.recordsForTopic(DATABASE.topicForTable("customers"));
        assertThat(customerInserts).hasSize(2);

        Struct customer1 = ((Struct) customerInserts.get(0).value()).getStruct(Envelope.FieldName.AFTER);
        assertThat(customer1.get("id")).isEqualTo(1001);
        assertThat(customer1.get("first_name")).isEqualTo("Nitin");

        Struct customer2 = ((Struct) customerInserts.get(1).value()).getStruct(Envelope.FieldName.AFTER);
        assertThat(customer2.get("id")).isEqualTo(1002);
        assertThat(customer2.get("first_name")).isEqualTo("Rajesh");

        // Verify the offset shows the correct number of events processed from the compressed transaction
        // After processing 2 inserts (each with a table map event), the event counter should be 4
        // (2 table map events + 2 write rows events)
        SourceRecord lastRecord = customerInserts.get(1);
        Object eventValue = lastRecord.sourceOffset().get("event");
        assertThat(eventValue).isNotNull();
        assertThat(((Number) eventValue).intValue()).isEqualTo(4);

        // Verify that the connector has stopped
        completion.await(10, TimeUnit.SECONDS);
        assertThat(completion.hasCompleted()).isTrue();
        assertThat(completion.hasError()).isTrue();
        assertThat(completion.success()).isFalse();
        assertNoRecordsToConsume();
        assertConnectorNotRunning();

        stopConnector();

        // Restart the connector - it should skip the first 2 events and start from the 3rd (id 1003)
        start(getConnectorClass(), config);
        assertConnectorIsRunning();

        // Should consume the remaining 3 inserts (1003, 1004, 1005)
        records = consumeRecordsByTopic(3);
        customerInserts = records.recordsForTopic(DATABASE.topicForTable("customers"));
        assertThat(customerInserts).hasSize(3);

        // Verify the first record after restart is id 1003
        Struct customer3 = ((Struct) customerInserts.get(0).value()).getStruct(Envelope.FieldName.AFTER);
        assertThat(customer3.get("id")).isEqualTo(1003);
        assertThat(customer3.get("first_name")).isEqualTo("John");

        Struct customer4 = ((Struct) customerInserts.get(1).value()).getStruct(Envelope.FieldName.AFTER);
        assertThat(customer4.get("id")).isEqualTo(1004);
        assertThat(customer4.get("first_name")).isEqualTo("Jane");

        Struct customer5 = ((Struct) customerInserts.get(2).value()).getStruct(Envelope.FieldName.AFTER);
        assertThat(customer5.get("id")).isEqualTo(1005);
        assertThat(customer5.get("first_name")).isEqualTo("Bob");
    }
}
